---
title: "Realtime Streams"
sidebarTitle: "Streams"
description: "Stream data in realtime from your Trigger.dev tasks to your frontend or backend applications."
---

Realtime Streams allow you to pipe streaming data from your Trigger.dev tasks to your frontend or backend applications in real-time. This is perfect for use cases like streaming AI completions, progress updates, or any continuous data flow.

<Note>
  Streams v2 requires SDK version **4.1.0 or later**. Make sure to upgrade your `@trigger.dev/sdk`
  and `@trigger.dev/react-hooks` packages to use these features. If you're on an earlier version,
  see the [metadata.stream()](/runs/metadata#stream) documentation.
</Note>

## Overview

Streams v2 is a major upgrade that provides:

- **Unlimited stream length** (previously capped at 2000 chunks)
- **Unlimited active streams per run** (previously 5)
- **Improved reliability** with automatic resumption on connection loss
- **28-day stream retention** (previously 1 day)
- **Multiple client streams** can pipe to a single stream
- **Enhanced dashboard visibility** for viewing stream data in real-time

## Enabling Streams v2

Streams v2 is **automatically enabled** when triggering runs from the SDK using 4.1.0 or later. If you aren't triggering via the SDK, you'll need to explicitly enable v2 streams via setting the `x-trigger-realtime-streams-version=v2` header when triggering the task.

If you'd like to **opt-out** of the v2 streams, you can see so in one of the following two ways:

### Option 1: Configure the SDK

```ts
import { auth } from "@trigger.dev/sdk";

auth.configure({
  future: {
    v2RealtimeStreams: false,
  },
});
```

### Option 2: Environment Variable

Set the `TRIGGER_V2_REALTIME_STREAMS=0` environment variable in your backend code (where you trigger tasks).

## Limits Comparison

| Limit                            | Streams v1 | Streams v2 |
| -------------------------------- | ---------- | ---------- |
| Maximum stream length            | 2000       | Unlimited  |
| Number of active streams per run | 5          | Unlimited  |
| Maximum streams per run          | 10         | Unlimited  |
| Maximum stream TTL               | 1 day      | 28 days    |
| Maximum stream size              | 10MB       | 300 MiB    |

## Quick Start

The recommended workflow for using Realtime Streams v2:

1. **Define your streams** in a shared location using `streams.define()`
2. **Use the defined stream** in your tasks with `.pipe()`, `.append()`, or `.writer()`
3. **Read from the stream** using `.read()` or the `useRealtimeStream` hook in React

This approach gives you full type safety, better code organization, and easier maintenance as your application grows.

## Defining Typed Streams (Recommended)

The recommended way to work with streams is to define them once with `streams.define()`. This allows you to specify the chunk type and stream ID in one place, and then reuse that definition throughout your codebase with full type safety.

### Creating a Defined Stream

Define your streams in a shared location (like `app/streams.ts` or `trigger/streams.ts`):

```ts
import { streams, InferStreamType } from "@trigger.dev/sdk";

// Define a stream with a specific type
export const aiStream = streams.define<string>({
  id: "ai-output",
});

// Export the type for use in frontend components
export type AIStreamPart = InferStreamType<typeof aiStream>;
```

You can define streams for any JSON-serializable type:

```ts
import { streams, InferStreamType } from "@trigger.dev/sdk";
import { UIMessageChunk } from "ai";

// Stream for AI UI message chunks
export const aiStream = streams.define<UIMessageChunk>({
  id: "ai",
});

// Stream for progress updates
export const progressStream = streams.define<{ step: string; percent: number }>({
  id: "progress",
});

// Stream for simple text
export const logStream = streams.define<string>({
  id: "logs",
});

// Export types
export type AIStreamPart = InferStreamType<typeof aiStream>;
export type ProgressStreamPart = InferStreamType<typeof progressStream>;
export type LogStreamPart = InferStreamType<typeof logStream>;
```

### Using Defined Streams in Tasks

Once defined, you can use all stream methods on your defined stream:

```ts
import { task } from "@trigger.dev/sdk";
import { aiStream } from "./streams";

export const streamTask = task({
  id: "stream-task",
  run: async (payload: { prompt: string }) => {
    // Get a stream from an AI service, database, etc.
    const stream = await getAIStream(payload.prompt);

    // Pipe the stream using your defined stream
    const { stream: readableStream, waitUntilComplete } = aiStream.pipe(stream);

    // Option A: Iterate over the stream locally
    for await (const chunk of readableStream) {
      console.log("Received chunk:", chunk);
    }

    // Option B: Wait for the stream to complete
    await waitUntilComplete();

    return { message: "Stream completed" };
  },
});
```

#### Reading from a Stream

Use the defined stream's `read()` method to consume data from anywhere (frontend, backend, or another task):

```ts
import { aiStream } from "./streams";

const stream = await aiStream.read(runId);

for await (const chunk of stream) {
  console.log(chunk); // chunk is typed as the stream's chunk type
}
```

With options:

```ts
const stream = await aiStream.read(runId, {
  timeoutInSeconds: 60, // Stop if no data for 60 seconds
  startIndex: 10, // Start from the 10th chunk
});
```

#### Appending to a Stream

Use the defined stream's `append()` method to add a single chunk:

```ts
import { task } from "@trigger.dev/sdk";
import { aiStream, progressStream, logStream } from "./streams";

export const appendTask = task({
  id: "append-task",
  run: async (payload) => {
    // Append to different streams with full type safety
    await logStream.append("Processing started");
    await progressStream.append({ step: "Initialization", percent: 0 });

    // Do some work...

    await progressStream.append({ step: "Processing", percent: 50 });
    await logStream.append("Step 1 complete");

    // Do more work...

    await progressStream.append({ step: "Complete", percent: 100 });
    await logStream.append("All steps complete");
  },
});
```

#### Writing Multiple Chunks

Use the defined stream's `writer()` method for more complex stream writing:

```ts
import { task } from "@trigger.dev/sdk";
import { logStream } from "./streams";

export const writerTask = task({
  id: "writer-task",
  run: async (payload) => {
    const { waitUntilComplete } = logStream.writer({
      execute: ({ write, merge }) => {
        // Write individual chunks
        write("Chunk 1");
        write("Chunk 2");

        // Merge another stream
        const additionalStream = ReadableStream.from(["Chunk 3", "Chunk 4", "Chunk 5"]);
        merge(additionalStream);
      },
    });

    await waitUntilComplete();
  },
});
```

### Using Defined Streams in React

Defined streams work seamlessly with the `useRealtimeStream` hook:

```tsx
"use client";

import { useRealtimeStream } from "@trigger.dev/react-hooks";
import { aiStream } from "@/app/streams";

export function StreamViewer({ accessToken, runId }: { accessToken: string; runId: string }) {
  // Pass the defined stream directly - full type safety!
  const { parts, error } = useRealtimeStream(aiStream, runId, {
    accessToken,
    timeoutInSeconds: 600,
  });

  if (error) return <div>Error: {error.message}</div>;
  if (!parts) return <div>Loading...</div>;

  return (
    <div>
      {parts.map((part, i) => (
        <span key={i}>{part}</span>
      ))}
    </div>
  );
}
```

## Direct Stream Methods (Without Defining)

<Warning>
  We strongly recommend using `streams.define()` instead of direct methods. Defined streams provide
  better organization, full type safety, and make it easier to maintain your codebase as it grows.
</Warning>

If you have a specific reason to avoid defined streams, you can use stream methods directly by specifying the stream key each time.

### Direct Piping

```ts
import { streams, task } from "@trigger.dev/sdk";

export const directStreamTask = task({
  id: "direct-stream",
  run: async (payload: { prompt: string }) => {
    const stream = await getAIStream(payload.prompt);

    // Specify the stream key directly
    const { stream: readableStream, waitUntilComplete } = streams.pipe("ai-output", stream);

    await waitUntilComplete();
  },
});
```

### Direct Reading

```ts
import { streams } from "@trigger.dev/sdk";

// Specify the stream key when reading
const stream = await streams.read(runId, "ai-output");

for await (const chunk of stream) {
  console.log(chunk);
}
```

### Direct Appending

```ts
import { streams, task } from "@trigger.dev/sdk";

export const directAppendTask = task({
  id: "direct-append",
  run: async (payload) => {
    // Specify the stream key each time
    await streams.append("logs", "Processing started");
    await streams.append("progress", "50%");
    await streams.append("logs", "Complete");
  },
});
```

### Direct Writing

```ts
import { streams, task } from "@trigger.dev/sdk";

export const directWriterTask = task({
  id: "direct-writer",
  run: async (payload) => {
    const { waitUntilComplete } = streams.writer("output", {
      execute: ({ write, merge }) => {
        write("Chunk 1");
        write("Chunk 2");
      },
    });

    await waitUntilComplete();
  },
});
```

## Default Stream

Every run has a "default" stream, allowing you to skip the stream key entirely. This is useful for simple cases where you only need one stream per run.

Using direct methods:

```ts
import { streams, task } from "@trigger.dev/sdk";

export const defaultStreamTask = task({
  id: "default-stream",
  run: async (payload) => {
    const stream = getDataStream();

    // No stream key needed - uses "default"
    const { waitUntilComplete } = streams.pipe(stream);

    await waitUntilComplete();
  },
});

// Reading from the default stream
const readStream = await streams.read(runId);
```

## Targeting Different Runs

You can pipe streams to parent, root, or any other run using the `target` option. This works with both defined streams and direct methods.

### With Defined Streams

```ts
import { task } from "@trigger.dev/sdk";
import { logStream } from "./streams";

export const childTask = task({
  id: "child-task",
  run: async (payload, { ctx }) => {
    const stream = getDataStream();

    // Pipe to parent run
    logStream.pipe(stream, { target: "parent" });

    // Pipe to root run
    logStream.pipe(stream, { target: "root" });

    // Pipe to self (default behavior)
    logStream.pipe(stream, { target: "self" });

    // Pipe to a specific run ID
    logStream.pipe(stream, { target: payload.otherRunId });
  },
});
```

### With Direct Methods

```ts
import { streams, task } from "@trigger.dev/sdk";

export const childTask = task({
  id: "child-task",
  run: async (payload, { ctx }) => {
    const stream = getDataStream();

    // Pipe to parent run
    streams.pipe("output", stream, { target: "parent" });

    // Pipe to root run
    streams.pipe("output", stream, { target: "root" });

    // Pipe to a specific run ID
    streams.pipe("output", stream, { target: payload.otherRunId });
  },
});
```

## Streaming from Outside a Task

If you specify a `target` run ID, you can pipe streams from anywhere (like a Next.js API route):

```ts
import { streams } from "@trigger.dev/sdk";
import { openai } from "@ai-sdk/openai";
import { streamText } from "ai";

export async function POST(req: Request) {
  const { messages, runId } = await req.json();

  const result = streamText({
    model: openai("gpt-4o"),
    messages,
  });

  // Pipe AI stream to a Trigger.dev run
  const { stream } = streams.pipe("ai-stream", result.toUIMessageStream(), {
    target: runId,
  });

  return new Response(stream as any, {
    headers: { "Content-Type": "text/event-stream" },
  });
}
```

## React Hook

Use the `useRealtimeStream` hook to subscribe to streams in your React components.

### With Defined Streams (Recommended)

```tsx
"use client";

import { useRealtimeStream } from "@trigger.dev/react-hooks";
import { aiStream } from "@/app/streams";

export function StreamViewer({ accessToken, runId }: { accessToken: string; runId: string }) {
  // Pass the defined stream directly for full type safety
  const { parts, error } = useRealtimeStream(aiStream, runId, {
    accessToken,
    timeoutInSeconds: 600,
    onData: (chunk) => {
      console.log("New chunk:", chunk); // chunk is typed!
    },
  });

  if (error) return <div>Error: {error.message}</div>;
  if (!parts) return <div>Loading...</div>;

  return (
    <div>
      {parts.map((part, i) => (
        <span key={i}>{part}</span>
      ))}
    </div>
  );
}
```

### With Direct Stream Keys

If you prefer not to use defined streams, you can specify the stream key directly:

```tsx
"use client";

import { useRealtimeStream } from "@trigger.dev/react-hooks";

export function StreamViewer({ accessToken, runId }: { accessToken: string; runId: string }) {
  const { parts, error } = useRealtimeStream<string>(runId, "ai-output", {
    accessToken,
    timeoutInSeconds: 600,
  });

  if (error) return <div>Error: {error.message}</div>;
  if (!parts) return <div>Loading...</div>;

  return (
    <div>
      {parts.map((part, i) => (
        <span key={i}>{part}</span>
      ))}
    </div>
  );
}
```

### Using Default Stream

```tsx
// Omit stream key to use the default stream
const { parts, error } = useRealtimeStream<string>(runId, {
  accessToken,
});
```

### Hook Options

```tsx
const { parts, error } = useRealtimeStream(streamDef, runId, {
  accessToken: "pk_...", // Required: Public access token
  baseURL: "https://api.trigger.dev", // Optional: Custom API URL
  timeoutInSeconds: 60, // Optional: Timeout (default: 60)
  startIndex: 0, // Optional: Start from specific chunk
  throttleInMs: 16, // Optional: Throttle updates (default: 16ms)
  onData: (chunk) => {}, // Optional: Callback for each chunk
});
```

## Complete Example: AI Streaming

### Define the stream

```ts
// app/streams.ts
import { streams, InferStreamType } from "@trigger.dev/sdk";
import { UIMessageChunk } from "ai";

export const aiStream = streams.define<UIMessageChunk>({
  id: "ai",
});

export type AIStreamPart = InferStreamType<typeof aiStream>;
```

### Create the task

```ts
// trigger/ai-task.ts
import { task } from "@trigger.dev/sdk";
import { openai } from "@ai-sdk/openai";
import { streamText } from "ai";
import { aiStream } from "@/app/streams";

export const generateAI = task({
  id: "generate-ai",
  run: async (payload: { prompt: string }) => {
    const result = streamText({
      model: openai("gpt-4o"),
      prompt: payload.prompt,
    });

    const { waitUntilComplete } = aiStream.pipe(result.toUIMessageStream());

    await waitUntilComplete();

    return { success: true };
  },
});
```

### Frontend component

```tsx
// components/ai-stream.tsx
"use client";

import { useRealtimeStream } from "@trigger.dev/react-hooks";
import { aiStream } from "@/app/streams";

export function AIStream({ accessToken, runId }: { accessToken: string; runId: string }) {
  const { parts, error } = useRealtimeStream(aiStream, runId, {
    accessToken,
    timeoutInSeconds: 300,
  });

  if (error) return <div>Error: {error.message}</div>;
  if (!parts) return <div>Loading...</div>;

  return (
    <div className="prose">
      {parts.map((part, i) => (
        <span key={i}>{part}</span>
      ))}
    </div>
  );
}
```

## Migration from v1

If you're using the old `metadata.stream()` API, here's how to migrate to the recommended v2 approach:

### Step 1: Define Your Streams

Create a shared streams definition file:

```ts
// app/streams.ts or trigger/streams.ts
import { streams, InferStreamType } from "@trigger.dev/sdk";

export const myStream = streams.define<string>({
  id: "my-stream",
});

export type MyStreamPart = InferStreamType<typeof myStream>;
```

### Step 2: Update Your Tasks

Replace `metadata.stream()` with the defined stream's `pipe()` method:

```ts
// Before (v1)
import { metadata, task } from "@trigger.dev/sdk";

export const myTask = task({
  id: "my-task",
  run: async (payload) => {
    const stream = getDataStream();
    await metadata.stream("my-stream", stream);
  },
});
```

```ts
// After (v2 - Recommended)
import { task } from "@trigger.dev/sdk";
import { myStream } from "./streams";

export const myTask = task({
  id: "my-task",
  run: async (payload) => {
    const stream = getDataStream();

    // Don't await - returns immediately
    const { waitUntilComplete } = myStream.pipe(stream);

    // Optionally wait for completion
    await waitUntilComplete();
  },
});
```

### Step 3: Update Your Frontend

Use the defined stream with `useRealtimeStream`:

```tsx
// Before
const { parts, error } = useRealtimeStream<string>(runId, "my-stream", {
  accessToken,
});
```

```tsx
// After
import { myStream } from "@/app/streams";

const { parts, error } = useRealtimeStream(myStream, runId, {
  accessToken,
});
```

### Alternative: Direct Methods (Not Recommended)

If you prefer not to use defined streams, you can use direct methods:

```ts
import { streams, task } from "@trigger.dev/sdk";

export const myTask = task({
  id: "my-task",
  run: async (payload) => {
    const stream = getDataStream();
    const { waitUntilComplete } = streams.pipe("my-stream", stream);
    await waitUntilComplete();
  },
});
```

## Reliability Features

Streams v2 includes automatic reliability improvements:

- **Automatic resumption**: If a connection is lost, both appending and reading will automatically resume from the last successful chunk
- **No data loss**: Network issues won't cause stream data to be lost
- **Idempotent operations**: Duplicate chunks are automatically handled

These improvements happen automatically - no code changes needed.

## Dashboard Integration

Streams are now visible in the Trigger.dev dashboard, allowing you to:

- View stream data in real-time as it's generated
- Inspect historical stream data for completed runs
- Debug streaming issues with full visibility into chunk delivery

<video src="https://content.trigger.dev/streams-v2-dashboard.mp4" controls muted autoPlay loop />

## Best Practices

1. **Always use `streams.define()`**: Define your streams in a shared location for better organization, type safety, and code reusability. This is the recommended approach for all streams.
2. **Export stream types**: Use `InferStreamType` to export types for your frontend components
3. **Handle errors gracefully**: Always check for errors when reading streams in your UI
4. **Set appropriate timeouts**: Adjust `timeoutInSeconds` based on your use case (AI completions may need longer timeouts)
5. **Target parent runs**: When orchestrating with child tasks, pipe to parent runs for easier consumption
6. **Throttle frontend updates**: Use `throttleInMs` in `useRealtimeStream` to prevent excessive re-renders
7. **Use descriptive stream IDs**: Choose clear, descriptive IDs like `"ai-output"` or `"progress"` instead of generic names

## Troubleshooting

### Stream not appearing in dashboard

- Ensure you've enabled Streams v2 via the future flag or environment variable
- Verify your task is actually writing to the stream
- Check that the stream key matches between writing and reading

### Stream timeout errors

- Increase `timeoutInSeconds` in your `read()` or `useRealtimeStream()` calls
- Ensure your stream source is actively producing data
- Check network connectivity between your application and Trigger.dev

### Missing chunks

- With v2, chunks should never be lost due to automatic resumption
- Verify you're reading from the correct stream key
- Check the `startIndex` option if you're not seeing expected chunks
